/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.segments.records.merge;

import java.util.Date;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.ExtendedAttributesAwareContext;
import org.unidata.mdm.data.context.MergeDuplicatesContext;
import org.unidata.mdm.data.context.ReadWriteDataContext;
import org.unidata.mdm.data.context.ReadWriteTimelineContext;
import org.unidata.mdm.data.context.RecordIdentityContext;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.impl.RecordComposerComponent;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RecordKeys.RecordKeysBuilder;
import org.unidata.mdm.system.type.pipeline.PipelineInput;
import org.unidata.mdm.system.type.pipeline.Point;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Mikhail Mikhailov on Nov 10, 2019
 */
@Component(RecordMergeTimelineExecutor.SEGMENT_ID)
public class RecordMergeTimelineExecutor extends Point<PipelineInput> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RECORD_MERGE_TIMELINE]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".record.merge.timeline.description";
    /**
     * The composer.
     */
    @Autowired
    private RecordComposerComponent recordComposerComponent;
    /**
     * Constructor.
     * @param id
     * @param description
     */
    public RecordMergeTimelineExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @SuppressWarnings("unchecked")
    @Override
    public void point(PipelineInput ctx) {

        MeasurementPoint.start();
        try {

            RecordIdentityContext iCtx = (RecordIdentityContext) ctx;
            ReadWriteDataContext<OriginRecord> dCtx = (ReadWriteDataContext<OriginRecord>) ctx;
            ReadWriteTimelineContext<OriginRecord> tCtx = (ReadWriteTimelineContext<OriginRecord>) ctx;
            MergeDuplicatesContext<OriginRecord> mCtx = (MergeDuplicatesContext<OriginRecord>) ctx;
            ExtendedAttributesAwareContext aCtx = (ExtendedAttributesAwareContext) ctx;

            // 1. Merge duplicates timeline to master timeline
            RecordKeys keys = iCtx.keys();
            Date ts = dCtx.timestamp();
            String user = SecurityUtils.getCurrentUserName();

            // 2. Add origins to keys
            List<RecordKeys> duplicateKeys = mCtx.duplicateKeys();
            RecordKeys masterKeys = iCtx.keys();
            RecordKeysBuilder rkb = RecordKeys.builder(masterKeys)
                    .updateDate(ts)
                    .updatedBy(user);

            duplicateKeys.forEach(dk -> rkb.supplementaryKeys(dk.getSupplementaryKeys()));

            Timeline<OriginRecord> next = tCtx.currentTimeline();
            for (Timeline<OriginRecord> dt : mCtx.duplicateTimelines().values()) {
                next = next.merge(dt);
            }

            // Original keys traditionally remain unchanged until the end of the PL
            // Set the new keys state to the NEXT timeline
            next.setKeys(rkb.build());
            next.forEach(i -> recordComposerComponent.toEtalon(keys, i, ts, user, aCtx.isIncludeWinners()));

            tCtx.nextTimeline(next);
        } finally {
            MeasurementPoint.stop();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return RecordIdentityContext.class.isAssignableFrom(start.getInputTypeClass())
            && MergeDuplicatesContext.class.isAssignableFrom(start.getInputTypeClass())
            && ReadWriteTimelineContext.class.isAssignableFrom(start.getInputTypeClass())
            && ReadWriteDataContext.class.isAssignableFrom(start.getInputTypeClass())
            && ExtendedAttributesAwareContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}
